/*
 * Copyright (c) 2001 Sun Microsystems, Inc.  All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *       Sun Microsystems, Inc. for Project JXTA."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
 *    not be used to endorse or promote products derived from this
 *    software without prior written permission. For written
 *    permission, please contact Project JXTA at http://www.jxta.org.
 *
 * 5. Products derived from this software may not be called "JXTA",
 *    nor may "JXTA" appear in their name, without prior written
 *    permission of Sun.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 *====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of Project JXTA.  For more
 * information on Project JXTA, please see
 * <http://www.jxta.org/>.
 *
 * This license is based on the BSD license adopted by the Apache Foundation.
 *
 * $Id: GetContentRequest.java,v 1.23 2006/02/07 20:43:51 bondolo Exp $
 *
 */

package net.jxta.share.client;

import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.BitSet;
import java.util.Hashtable;
import java.util.Enumeration;

import net.jxta.endpoint.ByteArrayMessageElement;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.Messenger;
import net.jxta.endpoint.Message;
import net.jxta.peergroup.PeerGroup;
import net.jxta.share.CMS;
import net.jxta.share.ContentAdvertisement;

import org.apache.log4j.Logger;
import org.apache.log4j.Level;
/**
 * This class implements Get Content Request.
 * It request the given content from the given CMS request pipe.
 */

public class GetContentRequest implements EndpointListener, Runnable {

	private final static transient Logger LOG = Logger.getLogger(GetContentRequest.class.getName());

	private PeerGroup group = null;
	private EndpointAddress address = null;
	private ContentAdvertisement cAdv = null;
	private Hashtable targetTable = new Hashtable();
	private File destFile = null;

	private Thread thread = null;

	private volatile boolean isDone = false;
	private boolean hasFailed = false;

	private long contentLength = 0;
	private int chunkSize = 0;
	private int numberChunks = 0;
	private int numberReceived = 0;
	private BitSet bitSet = null;
	private BitSet bitSetReq = null;

	protected int maxParallelDownload = 10;
	protected int maxRetry = 3;
	protected long timeout = 20 * 1000; // 20sec * 3 = 1min
	protected int chunksAtOneTime = 10;

	private static int requestCounter = 0;
	private static Object counterSynch = new Object();

	public static int getUnusedRequestID() {
		synchronized (counterSynch) {
			return requestCounter++;
		}
	}

	/**
	 * Create a GetContentRequest to download remotely shared content to disk.
	 * The download will start immediately after this constructor is called.
	 *
	 *@param group the peergroup in which the content is being shared
	 *@param cAdv the advertisement for the content
	 *@param inFile the file in which to save the content 
	 */
	public GetContentRequest(
		PeerGroup group,
		ContentAdvertisement cAdv,
		File inFile) {
		this(group, new ContentAdvertisement[] { cAdv }, inFile);
	}

	/**
	 * Similar to
	 * {@link #GetContentRequest(PeerGroup, ContentAdvertisement, File)}
	 * , except the content is downloaded from multiple sources.
	 *@param cAdvs advertisements from the various sources that the content is
	 * to be downloaded from.
	 */
	public GetContentRequest(
		PeerGroup group,
		ContentAdvertisement[] cAdvs,
		File inFile) {
		if (cAdvs.length < 1) {
			isDone = true;
			hasFailed = true;
			notifyFailure();
			return;
		}

		this.group = group;
		this.cAdv = cAdvs[0];
		destFile = inFile;

		// register the message listener
		String addressStr = null;

		addressStr =
			"jxta://"
				+ CMS.getPeerId(group)
				+ "/CMS-GetRequest"
				+ Integer.toString(getUnusedRequestID())
				+ "/"
				+ CMS.getGroupId(group);

		if (LOG.isEnabledFor(Level.DEBUG))
			LOG.debug(
				"ContentName = '"
					+ cAdv.getName()
					+ "' reply address = "
					+ addressStr);
		if (LOG.isEnabledFor(Level.DEBUG))
			LOG.debug("save content to '" + inFile.getPath() + "'");
		address = new EndpointAddress(addressStr);

		if (LOG.isEnabledFor(Level.DEBUG))
			LOG.debug(
				"register address = "
					+ address.getServiceName()
					+ address.getServiceParameter());
		group.getEndpointService().addIncomingMessageListener(
			this,
			address.getServiceName(),
			address.getServiceParameter());

		for (int i = 0; i < cAdvs.length; i++) {
			//use a unique request ID for each of the sources
			String requestid = address.getServiceName() + ":" + i;

			//store the status information of each of the parallel downloads
			// in the targetTable
			targetTable.put(requestid, new TargetItem(cAdvs[i]));
		}

		thread = new Thread(this);
		thread.start();
	}

	public void run() {
		while (!isDone) {
			int counter = 0;
			Enumeration en = targetTable.keys();
			while (en.hasMoreElements()) {
				String requestid = (String) en.nextElement();
				TargetItem ti = (TargetItem) targetTable.get(requestid);
				if (counter >= maxParallelDownload) {
					break;
				}
				if (ti.errorcount >= maxRetry) {
					continue;
				}
				if (ti.timestamp > 0
					&& ti.timestamp + timeout < System.currentTimeMillis()) {
					clearBitSetReq(ti.bsReq);
					ti.errorcount++;
				}
				gcBitSetReq(ti.bsReq);
				if (ti.errorcount == 0
					&& countBitSet(ti.bsReq) >= chunksAtOneTime) {
					counter++;
					continue;
				}

				boolean ok = sendRequest(requestid, ti);
				ti.timestamp = System.currentTimeMillis();
				if (ok) {
					counter++;
				} else {
					ti.errorcount++;
				}
			}

			if (counter == 0) {
				isDone = true;
				group.getEndpointService().removeIncomingMessageListener(
					address.getServiceName(),
					address.getServiceParameter());
				trimFile();
				notifyFailure();
			}

			if (bitSetReq != null) {
				gcBitSetReq(bitSetReq);
			}

			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
			}
		}
	}

	private boolean sendRequest(String requestid, TargetItem ti) {
		ContentAdvertisement cAdv = ti.cAdv;
		// get the endpoint messenger for the result Peer
		if (LOG.isEnabledFor(Level.DEBUG))
			LOG.debug("GetContentRequest " + cAdv.getAddress());
		EndpointAddress destAddr = new EndpointAddress(cAdv.getAddress());

		long rangeBegin = 0;
		long rangeEnd = 64 * 1024 - 1;
		if (bitSetReq != null) {
			int chunkBegin = -1;
			int chunkEnd = -1;
			for (int i = 0; i < numberChunks; i++) {
				if (chunkBegin >= 0) {
					if (bitSet.get(i) || bitSetReq.get(i)) {
						chunkEnd = i - 1;
						break;
					}
				} else {
					if (!bitSet.get(i) && !bitSetReq.get(i)) {
						chunkBegin = i;
					}
				}
			}
			if (chunkBegin == -1) {
				// can happen?
				clearBitSetReq(bitSetReq);
				return true;
			}
			if (chunkEnd == -1) {
				chunkEnd = numberChunks - 1;
			}
			if (chunkEnd > chunkBegin + chunksAtOneTime - 1) {
				chunkEnd = chunkBegin + chunksAtOneTime - 1;
			}
			for (int i = chunkBegin; i <= chunkEnd; i++) {
				bitSetReq.set(i);
				ti.bsReq.set(i);
			}

			rangeBegin = chunkBegin * chunkSize;
			rangeEnd = (chunkEnd + 1) * chunkSize - 1;
			if (rangeEnd >= contentLength) {
				rangeEnd = contentLength - 1;
			}
		}

		try {
			Messenger messenger =
				group.getEndpointService().getMessenger(destAddr);

			// send the request
			Message message = new Message();

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.MESSAGE_TYPE,
					CMS.encodeAs,
					CMS.GET_REQUEST.getBytes(),
					null));

			//message.setBytes(CMS.MESSAGE_TYPE, CMS.GET_REQUEST.getBytes());

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.REQUEST_ID,
					CMS.encodeAs,
					requestid.getBytes(),
					null));

			//message.setBytes(CMS.REQUEST_ID, requestid.getBytes());

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.CONTENT_ID,
					CMS.encodeAs,
					cAdv.getContentId().toString().getBytes(),
					null));

			//			message.setBytes(
			//				CMS.CONTENT_ID,
			//				cAdv.getContentId().toString().getBytes());

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.RETURN_ADDRESS,
					CMS.encodeAs,
					address.toString().getBytes(),
					null));

//			message.setBytes(CMS.RETURN_ADDRESS, address.toString().getBytes());

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.RANGE_BEGIN
					,CMS.encodeAs
					,Long.toString(rangeBegin).getBytes()
					,null));

//			message.setBytes(
//				CMS.RANGE_BEGIN,
//				Long.toString(rangeBegin).getBytes());

			message.addMessageElement(
				new ByteArrayMessageElement(
					CMS.RANGE_END
					,CMS.encodeAs
					,Long.toString(rangeEnd).getBytes()
					,null));

//			message.setBytes(CMS.RANGE_END, Long.toString(rangeEnd).getBytes());

			messenger.sendMessage(message);
		} catch (Exception e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not send GET request", e);
			return false;
		}
		return true;
	}

	public void processIncomingMessage(
		Message message,
		EndpointAddress srcAddr,
		EndpointAddress dstAddr) {

		if (LOG.isEnabledFor(Level.DEBUG))
			LOG.debug("processIncomingMessage()");

		if (message != null) {
			// Handle the message
			String messageType = null;
			String mRid = null;
			String mCid = null;
			try {
				messageType = CMS.popString(message, CMS.MESSAGE_TYPE);
				mRid = CMS.popString(message, CMS.REQUEST_ID);
				mCid = CMS.popString(message, CMS.CONTENT_ID);

				TargetItem ti = (TargetItem) targetTable.get(mRid);

				if (CMS.GET_RESULT.equals(messageType)
					&& ti != null
					&& cAdv.getContentId().toString().equals(mCid)) {

					ti.timestamp = System.currentTimeMillis();
					ti.errorcount = 0;
					handleMessage(message);
				}
			} catch (IOException e) {
				if (LOG.isEnabledFor(Level.DEBUG))
					LOG.debug("could not handle message", e);
			}
		}
	}

	public void cancel() {
		isDone = true;
		if (null != thread) {
			thread.interrupt();
		}

		group.getEndpointService().removeIncomingMessageListener(
			address.getServiceName(),address.getServiceParameter());

		trimFile();
	}

	private synchronized void trimFile() {
		if (hasFailed) {
			return;
		}
		hasFailed = true;

		if (bitSet == null) {
			return;
		}
		if (numberReceived >= numberChunks) {
			return;
		}
		int count = 0;
		while (bitSet.get(count)) {
			count++;
		}
		if (count * chunkSize > contentLength) {
			return;
		}
		try {
			RandomAccessFile op = new RandomAccessFile(destFile, "rw");
			op.setLength(count * chunkSize);
			op.close();
		} catch (Exception e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not trim file " + destFile, e);
			destFile.delete();
			return;
		}

	}

	public boolean isDone() {
		return isDone;
	}

	public boolean hasFailed() {
		return hasFailed;
	}

	public int getPercentDone() {
		if (0 == numberChunks) {
			return 0;
		}

		return (numberReceived * 100) / numberChunks;
	}

	/**
	 * These notify methods are to be overridden by a subclass that 
	 * wants to track the progress of the request
	 */
	public void notifyDone() {
	}

	public void notifyFailure() {
	}

	public void notifyUpdate(int percentage) {
	}

	public File getFile() {
		return destFile;
	}

	private synchronized void handleMessage(Message message) {
		long length = 0;
		long offset = 0;
		int size = 0;

		// extract the content length
		try {
			length = popLong(message, CMS.CONTENT_LENGTH);
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not pop CONTENT_LENGTH tag", e);
			return;
		} catch (NumberFormatException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not parse CONTENT_LENGTH tag", e);
			return;
		}

		// zero-sized content (special case)
		if (length == 0) {
			try {
				RandomAccessFile op = new RandomAccessFile(destFile, "rw");
				op.setLength(0);
				op.close();
			} catch (Exception e) {
				if (LOG.isEnabledFor(Level.DEBUG))
					LOG.debug("could not create file " + destFile, e);
				return;
			}

			isDone = true;
			notifyDone();

			group.getEndpointService().removeIncomingMessageListener(
				address.getServiceName(),address.getServiceParameter());

			return;
		}

		// extract the content offset
		try {
			offset = popLong(message, CMS.CHUNK_OFFSET);
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not pop CHUNK_OFFSET tag", e);
			return;
		} catch (NumberFormatException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not parse CHUNK_OFFSET tag", e);
			return;
		}

		// extract the chunk size
		try {
			size = popInt(message, CMS.CHUNK_SIZE);
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not pop CHUNK_SIZE tag", e);
			return;
		} catch (NumberFormatException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not parse CHUNK_SIZE tag", e);
			return;
		}

		// check if the bitSet has been created
		if (bitSet == null) {
			numberChunks = (int) (length / size);
			// check for case where there is a particle last chunk

			if (size * numberChunks < length) {
				numberChunks++;
			}

			// this is the first chunk
			bitSet = new BitSet(numberChunks);

			contentLength = length;
			chunkSize = size;

			// check received chunks
			long fileLength = destFile.length();
			for (int i = 0;(i + 1) * chunkSize <= fileLength; i++) {
				numberReceived++;
				bitSet.set(i);
			}

			// for request
			bitSetReq = new BitSet(numberChunks);
		}

		if (contentLength != length) {
			// not compatible
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("incompatible content length");
			return;
		}

		// check if this chunk has been received already
		int chunkNumber = (int) (offset / chunkSize);
		if (bitSet.get(chunkNumber)) {
			// already seen this chunk
			return;
		}

		if (chunkNumber < numberChunks - 1 && chunkSize != size) {
			// not compatible
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("incompatible chunk size");
			return;
		}

		if (chunkNumber * chunkSize != offset) {
			// not compatible
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("invalid offset");
			return;
		}

		if (isDone) {
			return;
		}

		InputStream chunkStream = null;

		try {
			chunkStream = message.getMessageElement(CMS.CHUNK_DATA).getStream();
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG)) {
				LOG.debug("could not pop CHUNK_DATA tag", e);
			}
			return;
		}
		RandomAccessFile op = null;
		try {
			op = new RandomAccessFile(destFile, "rw");
		} catch (Exception e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not create file " + destFile, e);
			return;
		}

		try {
			int available = chunkStream.available();
			byte[] buffer = new byte[available];

			int res = chunkStream.read(buffer);
			op.seek(offset);
			op.write(buffer, 0, res);
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not write to file " + destFile, e);
			return;
		}

		try {
			op.close();
			chunkStream.close();
		} catch (IOException e) {
			if (LOG.isEnabledFor(Level.DEBUG))
				LOG.debug("could not close file " + destFile, e);
		}

		// set the chunk as seen
		bitSet.set(chunkNumber);

		numberReceived++;
		notifyUpdate(getPercentDone());
		if (numberReceived >= numberChunks) {
			isDone = true;
			notifyDone();

			group.getEndpointService().removeIncomingMessageListener(
				address.getServiceName(),address.getServiceParameter());
		}
	}

	private static long popLong(Message message, String tag)
		throws IOException, NumberFormatException {
		String str = CMS.popString(message, tag);
		return Long.parseLong(str);
	}

	private static int popInt(Message message, String tag)
		throws IOException, NumberFormatException {
		String str = CMS.popString(message, tag);
		return Integer.parseInt(str);
	}

	public void finalize() {
		cancel();
	}

	private int countBitSet(BitSet bs) {
		int c = 0;
		for (int i = 0; i < bs.length(); i++) {
			if (bs.get(i)) {
				c++;
			}
		}
		return c;
	}

	private void clearBitSetReq(BitSet bs) {
		for (int i = 0; i < bs.length(); i++) {
			if (bs.get(i)) {
				bs.clear(i);
				if (bitSetReq != null && bitSetReq != bs) {
					bitSetReq.clear(i);
				}
			}
		}
	}

	// to clear received chunk
	// to find a chunk hole, better way?
	private void gcBitSetReq(BitSet bs) {
		for (int i = 0; i < bs.length(); i++) {
			if (bitSet.get(i)) {
				bs.clear(i);
				if (bitSetReq != null && bitSetReq != bs) {
					bitSetReq.clear(i);
				}
			} else if (
				bs == bitSetReq && bitSet.get(i + 1) && i < numberReceived) {
				bitSetReq.clear(i);
			}
		}
	}

	class TargetItem {
		ContentAdvertisement cAdv = null;
		long timestamp = 0;
		int errorcount = 0;
		BitSet bsReq = new BitSet();
		TargetItem(ContentAdvertisement c) {
			cAdv = c;
		}
	}

}
